/*
 * Copyright 2011 PA Consulting Ltd
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package com.prodeagle.java.servlets;

import java.io.IOException;
import java.io.PrintWriter;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.logging.Logger;

import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;

import org.json.simple.JSONObject;

import com.google.appengine.api.users.UserService;
import com.google.appengine.api.users.UserServiceFactory;
import com.google.apphosting.api.DeadlineExceededException;
import com.prodeagle.java.Authentication;
import com.prodeagle.java.MemCacheManager;
import com.prodeagle.java.ProdEagleConstants;
import com.prodeagle.java.counters.CounterNamesManager;
import com.prodeagle.java.counters.CounterUtil;

public class HarvestHandler extends HttpServlet implements ProdEagleConstants {
	/**
	 * Auto-generated for Serializable
	 */
	private static final long serialVersionUID = 7461496095351578024L;

	private static final Logger _logger = Logger.getLogger(HarvestHandler.class.getName());
	
	private static final UserService userService = UserServiceFactory.getUserService();
	
	private static final List<String> LOST_DATA_CHECK_KEYS = buildLostDataCheckKeys();
	private static final Map<String, Long> LOST_DATA_CHECK_VALUES = buildLostDataCheckValues();
	
	public void doGet(HttpServletRequest req, HttpServletResponse resp) {
		try {
			//check for the existence of ?viewer=xyz or ?administrator=xyz
			if (null != req.getParameter("viewer") || null != req.getParameter("administrator")) {
				_logger.info("Adding user");
				Authentication.addUser(req, resp);
			} else if (Authentication.isProdEagle(req, resp) || Authentication.isAdministrator(req, resp)) { //check prodeagle first, because admin does some redirecting
				String deleteCounter = req.getParameter("delete_counter");
				if (null != deleteCounter && !deleteCounter.isEmpty()) {
					_logger.info("Deleting counter: " + deleteCounter);
					CounterNamesManager cnm = CounterUtil.getDefaultCounterNamesManager();
					cnm.delete(Collections.singleton(deleteCounter));
				} else {
					_logger.info("Creating report");
					createReport(req, resp);
				}
			}
		} catch (DeadlineExceededException e) {
			_logger.severe("Unexpected DeadlineExceededException: " + e);
		} catch (Exception e) {
			_logger.severe("Unexpected exception: " + e);
		}
	}

	public void doPost(HttpServletRequest req, HttpServletResponse resp) {
		_logger.info(req.getParameterMap().toString());
		
		doGet(req, resp);
	}
	
	private Boolean isProductionCall(HttpServletRequest req) {
		String productionCallString = req.getParameter(PRODUCTION_CALL);
		
		Boolean isProductionCall = false;
		if (null != productionCallString && productionCallString.equalsIgnoreCase("1")) {
			isProductionCall = true;
		}
		
		_logger.info("Is production call? " + isProductionCall);
		
		return isProductionCall;
	}

	private long getLastSlot(HttpServletRequest req) {
		String lastSlotString = req.getParameter("last_time");
		
		Long lastSlot;
		if (null == lastSlotString) {
			_logger.info("Last slot not in parameters");
			long time = new Date().getTime() - MAX_LOOK_BACK;
			lastSlot = CounterUtil.getEpochRounded(new Date(time), MAX_LOOK_BACK);
		} else {
			_logger.info("Last slot in parameters");
			lastSlot = Long.parseLong(lastSlotString) * 1000; //turn it into milliseconds (because prodeagle gives it in seconds)
		}
		
		_logger.info("Last slot: " + lastSlot + " (or as a date: " + new Date(lastSlot).toString() + ")");
		return lastSlot;
	}
	
	@SuppressWarnings("unchecked")
	private void createReport(HttpServletRequest req, HttpServletResponse resp) {
		Boolean isProductionCall = isProductionCall(req);
		
		long slot = CounterUtil.getEpochRounded();
		
		JSONObject result = initaliseDefaultResult(slot);
		
		CounterNamesManager cnm = CounterUtil.getDefaultCounterNamesManager();
		
		Boolean allDataInaccurate = wasDataLostSinceLastHarvest(NAMESPACE, slot, isProductionCall);
		
		Set<String> allCounterNames = cnm.allCounterNames();
		Set<String> updateKeys = new HashSet<String>();
		Set<Object[]> updates = new HashSet<Object[]>();
		
		long lastSlot = getLastSlot(req);
		while (slot >= lastSlot) {
			long gap = new Date().getTime();
			
			Map<String, Object> slotUpdates = MemCacheManager.getMultipleCounters(allCounterNames, String.valueOf(slot), NAMESPACE);
						
			result.put("ms_of_data_lost", computeMsOfDataLost(gap, (Long) result.get("ms_of_data_lost")));
			
			updates.add(new Object[]{ slot, slotUpdates });
			updateKeys.addAll(slotUpdates.keySet());
			
			slot -= MIN_SLOT_SIZE;
		}

		if (isProductionCall) {
			MemCacheManager.deleteMulti(updateKeys, NAMESPACE);
		}
		
		if (!allDataInaccurate) {
			//have we lost any data since we first checked?
			allDataInaccurate = wasDataLostSinceLastHarvest(NAMESPACE, slot);
		}
		
		_logger.info("All data inaccurate? " + allDataInaccurate);
		//store the result of wasDataLost
		result.put("all_data_inaccurate", allDataInaccurate);
		
		buildCounterJSON(result, updateKeys, updates);

		writeReport(req, resp, isProductionCall, result);
	}

	@SuppressWarnings("unchecked")
	private void buildCounterJSON(JSONObject result, Set<String> updateKeys, Set<Object[]> updates) {
		try {
			for (String updateKey : updateKeys) {
				JSONObject counters = (JSONObject) result.get("counters");
	
				updateKey = updateKey.substring(updateKey.indexOf("_") + 1, updateKey.length());
				if (!counters.containsKey(updateKey)) {
					counters.put(updateKey, new JSONObject());
				}
				
				for (Object[] update : updates) {
					Long _slot = (Long) update[0];
					Map<String, Long> _slotUpdates = (Map<String, Long>) update[1];
					
					try {
						Object delta = _slotUpdates.get(_slot + "_" + updateKey);
						
						if (null != delta) {
							JSONObject x = (JSONObject) counters.get(updateKey);
							x.put(_slot / 1000, delta); //prodeagle time stamps are in seconds, not milliseconds
							counters.put(updateKey, x);
						}
					} catch (Exception e) {
						_logger.info("Test: " + e);
					}
				}
			}
		} catch (Exception e) {
			_logger.severe("Unexpected exception: " + e);
		}
		
		_logger.info("Creating counter JSON complete");
	}

	/**
	 * Write the response out. If the request is for json / production
	 * then write out the JSON string without anything else.
	 * 
	 * If not json/production, write out a helpful list of the counters
	 * and their values
	 * 
	 * @param req - the request
	 * @param resp - the response
	 * @param isProductionCall - is this a call from ProdEagle.com?
	 * @param result - the JSON to be written
	 */
	private void writeReport(HttpServletRequest req, HttpServletResponse resp, Boolean isProductionCall, JSONObject result) {
		try {
			if (isProductionCall || null != req.getParameter("json")) {
				resp.setContentType("text/plain; charset=utf-8");
				
				String jsonString = result.toJSONString();
				
				resp.getWriter().print(jsonString);
			} else {
				resp.setContentType("text/html");
				PrintWriter writer = resp.getWriter();
				writer.print("<html><head><title>ProEagle Stats</title><style> td { padding: 5px }</style></head><body>");
				writer.print(String.format("<h3>Data since last export: %s UTC</h3>", new Date()));
				writer.print("<a href='http://www.prodeagle.com'>Go to ProdEagle dashboard</a>");
				writer.print(String.format("<br /><br /><a href='%s'>Logout</a>", 
						userService.createLogoutURL(req.getRequestURI())));
				
				JSONObject counters = (JSONObject) result.get("counters");

				writer.print("<table><tr><td>Number of counters:</td><td>" + counters.size() + "</td></tr></table>");
				
				@SuppressWarnings("unchecked")
				List<String> sortedKeys = new ArrayList<String>(counters.keySet());
				Collections.sort(sortedKeys);
				
				for (Object counterKey : sortedKeys) {
					String counterName = (String) counterKey;
					JSONObject json = (JSONObject) counters.get(counterName);
					
					if (!json.isEmpty()) {
						writer.print("<table><thead><th colspan=\"3\">" + counterName + "</th></thead><tbody>");
						
						for (Object key : json.keySet()) {
							writer.print("<tr><td>");
							writer.print(key);
							writer.print("</td><td style=\"border-left: 1px solid grey; border-right: 1px solid grey;\">");
							writer.print(new Date((Long) key * 1000).toString()); //modify the dates back to include milliseconds
							writer.print("</td><td>");
							writer.print(json.get(key));
							writer.print("</td></tr>");
						}
						writer.print("</tbody></table>");
					}
				}
				writer.print("</body></html>");
				writer.flush();
			}
		} catch (IOException e) {
			_logger.severe("Failure to write response: " + e);
		}
		
		_logger.info("Writing Harvest report complete");
	}

	/**
	 * 
	 * @param gap - 
	 * @param currentMsOfDataLost
	 * @return
	 */
	private long computeMsOfDataLost(long gap, long currentMsOfDataLost) {
		long thisGap = (new Date().getTime() - gap);
		long max = Math.max(thisGap, currentMsOfDataLost);
		
		return max;
	}

	private Boolean wasDataLostSinceLastHarvest(String namespace, long slot) {
		return wasDataLostSinceLastHarvest(namespace, slot, false);
	}
	
	private Boolean wasDataLostSinceLastHarvest(String namespace, long slot, Boolean isProductionCall) {
		_logger.info("Determining if data was lost since last harvest");
		
		//fetch, async, from Memcache values for those keys
		Future<Map<String, Object>> lostDataCheckFuture = MemCacheManager.getMultipleCounters(LOST_DATA_CHECK_KEYS, NAMESPACE);
		
		Future<Map<String, Long>> storeMultipleCountersResult = null;
		
		try {
			//actually wait for the results, no longer than 5 seconds - must be done before the deleteMulti call below.
			Map<String, Object> lostDataCheck = lostDataCheckFuture.get(5, TimeUnit.SECONDS);
			
			if (isProductionCall) {
				//if it's production, delete the existing keys from MemCache
				MemCacheManager.deleteMulti(LOST_DATA_CHECK_KEYS, NAMESPACE);
				
				//Store a Map of those keys, all initialised to a value of 1. This needs to be complete before the method exits though (see finally block)
				storeMultipleCountersResult = MemCacheManager.storeMultipleCounters(LOST_DATA_CHECK_VALUES, NAMESPACE, 0L);
			}

			if (lostDataCheck.values().size() != EXPECTED_MEMCACHE_SERVERS) {
				_logger.warning("ProdEagle counters lost before " + slot);
				return true;
			}
		} catch (Exception e) {
			_logger.warning("Exception while retrieving future from Memcache. Some metrics may be lost. Exception: " + e);
		} finally {
			if (null != storeMultipleCountersResult) { //i.e. not production
				try {
					//force a wait for the result of the storage of the new lost data counters
					storeMultipleCountersResult.get(10, TimeUnit.SECONDS);
				} catch (Exception e) {
					_logger.warning("Failed to store the new ProdEagle counters for checking of lost data. Some metrics may be lost in future calls. Exception: " + e);
				}
			}
		}
		
		_logger.info("Data was not lost before "+ slot);
		return false;
	}

	/**
	 * Build a list of length N, where N is the EXPECTED_MEMCACHE_SERVERS count
	 * which consists of a list of keys of the form "last_slot_0, last_slot_1 etc
	 * 
	 * @return a List of length EXPECTED_MEMCACHE_SERVERS
	 */
	private static List<String> buildLostDataCheckKeys() {
		//build an array of keys of the form "last_slot_x", where x is a number.
		List<String> lostDataCheckKeys = new ArrayList<String>(EXPECTED_MEMCACHE_SERVERS);
		
		int i = 0;
		while (i < EXPECTED_MEMCACHE_SERVERS) {
			lostDataCheckKeys.add("last_slot_" + i);
			i++;
		}
		
		return lostDataCheckKeys;
	}
	
	/**
	 * Build a static Map of length N where N is the EXPECTED_MEMCACHE_SERVERS count
	 * which consists of a list of keys of the form "last_slot_0, last_slot_1 etc" with 
	 * values 1
	 * 
	 * @return a Map of length EXPECTED_MEMCACHE_SERVERS
	 */
	private static Map<String, Long> buildLostDataCheckValues() {
		Map<String, Long> lostDataCheckValues = new HashMap<String, Long>(EXPECTED_MEMCACHE_SERVERS);
		
		for (String key : LOST_DATA_CHECK_KEYS) {
			lostDataCheckValues.put(key, 1L);
		}
		
		return lostDataCheckValues;
	}

	@SuppressWarnings("unchecked")
	/**
	 * Creates a default result, with no results yet filled
	 * @param slot - the epoch time
	 * @return - a JSON Object with time, counters, ms_of_data_lost and version all set
	 */
	private JSONObject initaliseDefaultResult(long slot) {
		JSONObject json = new JSONObject();
		json.put("time", slot / 1000); //turn into a python time (i.e. without milliseconds)
		json.put("counters", new JSONObject());
		json.put("ms_of_data_lost", 0L);
		json.put("version", 1.0);
		
		return json;
	}
}
